Skip to content

[MongoDB Replication] Raw Change Streams#591

Merged
rkistner merged 23 commits intomainfrom
raw-change-streams-2
Apr 13, 2026
Merged

[MongoDB Replication] Raw Change Streams#591
rkistner merged 23 commits intomainfrom
raw-change-streams-2

Conversation

@rkistner
Copy link
Copy Markdown
Contributor

@rkistner rkistner commented Apr 3, 2026

Supersedes #309.

This switches to using aggregate/getMore commands directly, instead of the built-in watch methods.

Motivations:

  1. This gives us much more control over the entire process, instead of fighting the mongodb driver.
  2. By getting data back in batches, we can avoid the overhead from repeatedly calling tryNext() for every individual change (impact not measured yet).
  3. By parsing documents as we're processing them (instead of a batch at a time), we can slightly reduce memory usage.

This removes hacks previously using private APIs from:

This is also the first step in writing a more efficient conversion for raw BSON -> serialized JSON. We can't do that with the driver's change stream implementation, since that doesn't support getting results as raw BSON.

TODO:

  • Handle automatic retries according to the specification.
  • Confirm that we're following the remainder of the specification exactly.
  • Make sure we have proper unit tests, especially for cluster-level streams.

@changeset-bot
Copy link
Copy Markdown

changeset-bot bot commented Apr 3, 2026

🦋 Changeset detected

Latest commit: a011930

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 12 packages
Name Type
@powersync/service-module-mongodb Minor
@powersync/service-schema Patch
@powersync/service-image Patch
@powersync/service-core Patch
@powersync/service-core-tests Patch
@powersync/service-module-core Patch
@powersync/service-module-mongodb-storage Patch
@powersync/service-module-mssql Patch
@powersync/service-module-mysql Patch
@powersync/service-module-postgres-storage Patch
@powersync/service-module-postgres Patch
test-client Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@rkistner rkistner requested a review from Sleepful April 7, 2026 14:36
@rkistner rkistner marked this pull request as ready for review April 7, 2026 14:36
@rkistner rkistner mentioned this pull request Apr 8, 2026
Sleepful
Sleepful previously approved these changes Apr 10, 2026
Copy link
Copy Markdown
Contributor

@Sleepful Sleepful left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes look good.

streamChangesInternal in particular is a really long function with a lot of edge cases. I suggested some places where it could be broken down a bit, if you find more places that could be extracted, I think that could help with reading through. It is a complex function though, and the complexity cannot be removed. I cannot validate that all the if statements for the various edge cases are correct, I think that's the main oversight of my review.

Other than that, I like the rawChangeStream change, using db.command instead of internal API of mongo driver.

I suggested some changes but not blocking on them. Also if you want to apply some of the suggestions, I figure it might be convenient to merge current PR and do a follow-up PR for any changes.

@rkistner
Copy link
Copy Markdown
Contributor Author

@Sleepful Thanks for the review, I implemented some of the cosmetic suggestions.

(As mentioned in another comment) Note that for this specific PR, viewing the diff with "ignore whitespace changes" help a lot - a lot of the changes are in the diff simply because of the change in indentation.

Regardless, that function is indeed long and difficult to follow. We can try to split that up in a future PR.

@Sleepful
Copy link
Copy Markdown
Contributor

Ohhh I keep forgetting about the whitespace, good reminder

@rkistner rkistner merged commit d564c23 into main Apr 13, 2026
44 checks passed
@rkistner rkistner deleted the raw-change-streams-2 branch April 13, 2026 11:52
Sleepful added a commit that referenced this pull request Apr 14, 2026
Merges upstream main which includes PR #591 (raw change streams) and
PR #599 (direct BSON Buffer -> JSON conversion).

Auth fix conflicts (types.ts, config.test.ts) resolved — both sides
had the same fix, upstream also added database name decoding.

ChangeStream.ts has 11 unresolved conflicts — PR #591 replaced the
MongoDB driver ChangeStream with a custom RawChangeStream using raw
aggregate + getMore. Our Cosmos DB changes need to be re-applied to
the new code structure. Resolved in the next commit.
Sleepful added a commit that referenced this pull request Apr 14, 2026
Re-applied all Cosmos DB changes to the new raw change stream code
structure from PR #591. The raw aggregate approach is better for
Cosmos DB: no lazy ChangeStream init, explicit cursor management,
$changeStream stage built directly in pipeline.

Changes applied to new structure:
- detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal
- getEventTimestamp() adapted to ProjectedChangeStreamDocument type
- Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer)
- Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents
- Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb
- startAtOperationTime fix (startAfter != null)
- Keepalive guard for Cosmos DB resume tokens
- .lte() dedup guard skip on Cosmos DB
- wallTime tracking for replication lag

Verified: 59/59 standard MongoDB tests pass. Cosmos DB tests blocked
by cluster downtime (TLS connection timeout). Code audit of
RawChangeStream.ts found no Cosmos DB compatibility issues — cursor ID
type is automatically fixed by BigInt conversion, postBatchResumeToken
needs empirical verification.
Sleepful added a commit that referenced this pull request Apr 14, 2026
Re-applied all Cosmos DB changes to the new raw change stream code
structure from PR #591. The raw aggregate approach is better for
Cosmos DB: no lazy ChangeStream init, explicit cursor management,
$changeStream stage built directly in pipeline.

Changes applied to new structure:
- detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal
- getEventTimestamp() adapted to ProjectedChangeStreamDocument type
- Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer)
- Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents
- Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb
- startAtOperationTime fix (startAfter != null)
- Keepalive guard for Cosmos DB resume tokens
- .lte() dedup guard skip on Cosmos DB
- wallTime tracking for replication lag
- Added changeset for @powersync/service-module-mongodb (minor)

Verified: 59/59 standard MongoDB tests pass.
Cosmos DB cluster is currently down — tests blocked by TLS timeout.
Code audit of RawChangeStream.ts found no compatibility issues:
cursor ID type auto-fixed by BigInt, postBatchResumeToken needs
empirical verification when cluster is back.
Sleepful added a commit that referenced this pull request Apr 14, 2026
Merges upstream main which includes PR #591 (raw change streams) and
PR #599 (direct BSON Buffer -> JSON conversion).

Auth fix conflicts (types.ts, config.test.ts) resolved — both sides
had the same fix, upstream also added database name decoding.

ChangeStream.ts has 11 unresolved conflicts — PR #591 replaced the
MongoDB driver ChangeStream with a custom RawChangeStream using raw
aggregate + getMore. Our Cosmos DB changes need to be re-applied to
the new code structure. Resolved in the next commit.

resolve: ChangeStream.ts merge conflicts for raw change streams

Re-applied all Cosmos DB changes to the new raw change stream code
structure from PR #591. The raw aggregate approach is better for
Cosmos DB: no lazy ChangeStream init, explicit cursor management,
$changeStream stage built directly in pipeline.

Changes applied to new structure:
- detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal
- getEventTimestamp() adapted to ProjectedChangeStreamDocument type
- Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer)
- Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents
- Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb
- startAtOperationTime fix (startAfter != null)
- Keepalive guard for Cosmos DB resume tokens
- .lte() dedup guard skip on Cosmos DB
- wallTime tracking for replication lag
- Added changeset for @powersync/service-module-mongodb (minor)

Verified: 59/59 standard MongoDB tests pass.
Cosmos DB cluster is currently down — tests blocked by TLS timeout.
Code audit of RawChangeStream.ts found no compatibility issues:
cursor ID type auto-fixed by BigInt, postBatchResumeToken needs
empirical verification when cluster is back.
Sleepful added a commit that referenced this pull request Apr 14, 2026
Merges upstream main which includes PR #591 (raw change streams) and
PR #599 (direct BSON Buffer -> JSON conversion).

Auth fix conflicts (types.ts, config.test.ts) resolved — both sides
had the same fix, upstream also added database name decoding.

ChangeStream.ts has 11 unresolved conflicts — PR #591 replaced the
MongoDB driver ChangeStream with a custom RawChangeStream using raw
aggregate + getMore. Our Cosmos DB changes need to be re-applied to
the new code structure. Resolved in the next commit.

resolve: ChangeStream.ts merge conflicts for raw change streams

Re-applied all Cosmos DB changes to the new raw change stream code
structure from PR #591. The raw aggregate approach is better for
Cosmos DB: no lazy ChangeStream init, explicit cursor management,
$changeStream stage built directly in pipeline.

Changes applied to new structure:
- detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal
- getEventTimestamp() adapted to ProjectedChangeStreamDocument type
- Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer)
- Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents
- Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb
- startAtOperationTime fix (startAfter != null)
- Keepalive guard for Cosmos DB resume tokens
- .lte() dedup guard skip on Cosmos DB
- wallTime tracking for replication lag
- Added changeset for @powersync/service-module-mongodb (minor)

Verified: 59/59 standard MongoDB tests pass.
Cosmos DB cluster is currently down — tests blocked by TLS timeout.
Code audit of RawChangeStream.ts found no compatibility issues:
cursor ID type auto-fixed by BigInt, postBatchResumeToken needs
empirical verification when cluster is back.
Sleepful added a commit that referenced this pull request Apr 14, 2026
Re-applied all Cosmos DB changes to the new raw change stream code
structure from PR #591. The raw aggregate approach is better for
Cosmos DB: no lazy ChangeStream init, explicit cursor management,
$changeStream stage built directly in pipeline.

Changes applied to new structure:
- detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal
- getEventTimestamp() adapted to ProjectedChangeStreamDocument type
- Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer)
- Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents
- Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb
- startAtOperationTime fix (startAfter != null)
- Keepalive guard for Cosmos DB resume tokens
- .lte() dedup guard skip on Cosmos DB
- wallTime tracking for replication lag
- Added changeset for @powersync/service-module-mongodb (minor)

Verified: 59/59 standard MongoDB tests pass.
Cosmos DB cluster is currently down — tests blocked by TLS timeout.
Code audit of RawChangeStream.ts found no compatibility issues:
cursor ID type auto-fixed by BigInt, postBatchResumeToken needs
empirical verification when cluster is back.
Sleepful added a commit that referenced this pull request Apr 14, 2026
Merges upstream main which includes PR #591 (raw change streams) and
PR #599 (direct BSON Buffer -> JSON conversion).

Auth fix conflicts (types.ts, config.test.ts) resolved — both sides
had the same fix, upstream also added database name decoding.

ChangeStream.ts has 11 unresolved conflicts — PR #591 replaced the
MongoDB driver ChangeStream with a custom RawChangeStream using raw
aggregate + getMore. Our Cosmos DB changes need to be re-applied to
the new code structure. Resolved in the next commit.

resolve: ChangeStream.ts merge conflicts for raw change streams

Re-applied all Cosmos DB changes to the new raw change stream code
structure from PR #591. The raw aggregate approach is better for
Cosmos DB: no lazy ChangeStream init, explicit cursor management,
$changeStream stage built directly in pipeline.

Changes applied to new structure:
- detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal
- getEventTimestamp() adapted to ProjectedChangeStreamDocument type
- Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer)
- Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents
- Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb
- startAtOperationTime fix (startAfter != null)
- Keepalive guard for Cosmos DB resume tokens
- .lte() dedup guard skip on Cosmos DB
- wallTime tracking for replication lag
- Added changeset for @powersync/service-module-mongodb (minor)

Verified: 59/59 standard MongoDB tests pass.
Cosmos DB cluster is currently down — tests blocked by TLS timeout.
Code audit of RawChangeStream.ts found no compatibility issues:
cursor ID type auto-fixed by BigInt, postBatchResumeToken needs
empirical verification when cluster is back.
Sleepful added a commit that referenced this pull request Apr 14, 2026
Merges upstream main which includes PR #591 (raw change streams) and
PR #599 (direct BSON Buffer -> JSON conversion).

Auth fix conflicts (types.ts, config.test.ts) resolved — both sides
had the same fix, upstream also added database name decoding.

ChangeStream.ts has 11 unresolved conflicts — PR #591 replaced the
MongoDB driver ChangeStream with a custom RawChangeStream using raw
aggregate + getMore. Our Cosmos DB changes need to be re-applied to
the new code structure. Resolved in the next commit.

resolve: ChangeStream.ts merge conflicts for raw change streams

Re-applied all Cosmos DB changes to the new raw change stream code
structure from PR #591. The raw aggregate approach is better for
Cosmos DB: no lazy ChangeStream init, explicit cursor management,
$changeStream stage built directly in pipeline.

Changes applied to new structure:
- detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal
- getEventTimestamp() adapted to ProjectedChangeStreamDocument type
- Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer)
- Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents
- Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb
- startAtOperationTime fix (startAfter != null)
- Keepalive guard for Cosmos DB resume tokens
- .lte() dedup guard skip on Cosmos DB
- wallTime tracking for replication lag
- Added changeset for @powersync/service-module-mongodb (minor)

Verified: 59/59 standard MongoDB tests pass.
Cosmos DB cluster is currently down — tests blocked by TLS timeout.
Code audit of RawChangeStream.ts found no compatibility issues:
cursor ID type auto-fixed by BigInt, postBatchResumeToken needs
empirical verification when cluster is back.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants